package org.apache.lucene.index;

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.io.PrintStream;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Similarity;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMFile;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BitVector;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.ThreadInterruptedException;

/**
 * This class accepts multiple added documents and directly
 * writes a single segment file.  It does this more
 * efficiently than creating a single segment per document
 * (with DocumentWriter) and doing standard merges on those
 * segments.
 *
 * Each added document is passed to the {@link DocConsumer},
 * which in turn processes the document and interacts with
 * other consumers in the indexing chain.  Certain
 * consumers, like {@link StoredFieldsWriter} and {@link
 * TermVectorsTermsWriter}, digest a document and
 * immediately write bytes to the "doc store" files (ie,
 * they do not consume RAM per document, except while they
 * are processing the document).
 *
 * Other consumers, eg {@link FreqProxTermsWriter} and
 * {@link NormsWriter}, buffer bytes in RAM and flush only
 * when a new segment is produced.

 * Once we have used our allowed RAM buffer, or the number
 * of added docs is large enough (in the case we are
 * flushing by doc count instead of RAM usage), we create a
 * real segment and flush it to the Directory.
 *
 * Threads:
 *
 * Multiple threads are allowed into addDocument at once.
 * There is an initial synchronized call to getThreadState
 * which allocates a ThreadState for this thread.  The same
 * thread will get the same ThreadState over time (thread
 * affinity) so that if there are consistent patterns (for
 * example each thread is indexing a different content
 * source) then we make better use of RAM.  Then
 * processDocument is called on that ThreadState without
 * synchronization (most of the "heavy lifting" is in this
 * call).  Finally the synchronized "finishDocument" is
 * called to flush changes to the directory.
 *
 * When flush is called by IndexWriter we forcefully idle
 * all threads and flush only once they are all idle.  This
 * means you can call flush with a given thread even while
 * other threads are actively adding/deleting documents.
 *
 *
 * Exceptions:
 *
 * Because this class directly updates in-memory posting
 * lists, and flushes stored fields and term vectors
 * directly to files in the directory, there are certain
 * limited times when an exception can corrupt this state.
 * For example, a disk full while flushing stored fields
 * leaves this file in a corrupt state.  Or, an OOM
 * exception while appending to the in-memory posting lists
 * can corrupt that posting list.  We call such exceptions
 * "aborting exceptions".  In these cases we must call
 * abort() to discard all docs added since the last flush.
 *
 * All other exceptions ("non-aborting exceptions") can
 * still partially update the index structures.  These
 * updates are consistent, but, they represent only a part
 * of the document seen up until the exception was hit.
 * When this happens, we immediately mark the document as
 * deleted so that the document is always atomically ("all
 * or none") added to the index.
 */

final class DocumentsWriter {
    final AtomicLong bytesUsed = new AtomicLong(0);
    IndexWriter writer;
    Directory directory;

    String segment; // Current segment we are working on

    private int nextDocID; // Next docID to be added
    private int numDocs; // # of docs added, but not yet flushed

    // Max # ThreadState instances; if there are more threads
    // than this they share ThreadStates
    private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
    private final HashMap<Thread, DocumentsWriterThreadState> threadBindings = new HashMap<Thread, DocumentsWriterThreadState>();

    boolean bufferIsFull; // True when it's time to write segment
    private boolean aborting; // True if an abort is pending

    PrintStream infoStream;
    int maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
    Similarity similarity;

    // max # simultaneous threads; if there are more than
    // this, they wait for others to finish first
    private final int maxThreadStates;

    // Deletes for our still-in-RAM (to be flushed next) segment
    private BufferedDeletes pendingDeletes = new BufferedDeletes();

    static class DocState {
        DocumentsWriter docWriter;
        Analyzer analyzer;
        int maxFieldLength;
        PrintStream infoStream;
        Similarity similarity;
        int docID;
        Document doc;
        String maxTermPrefix;

        // Only called by asserts
        public boolean testPoint(String name) {
            return docWriter.writer.testPoint(name);
        }

        public void clear() {
            // don't hold onto doc nor analyzer, in case it is
            // largish:
            doc = null;
            analyzer = null;
        }
    }

    /** Consumer returns this on each doc.  This holds any
     *  state that must be flushed synchronized "in docID
     *  order".  We gather these and flush them in order. */
    abstract static class DocWriter {
        DocWriter next;
        int docID;

        abstract void finish() throws IOException;

        abstract void abort();

        abstract long sizeInBytes();

        void setNext(DocWriter next) {
            this.next = next;
        }
    }

    /**
     * Create and return a new DocWriterBuffer.
     */
    PerDocBuffer newPerDocBuffer() {
        return new PerDocBuffer();
    }

    /**
     * RAMFile buffer for DocWriters.
     */
    class PerDocBuffer extends RAMFile {

        /**
         * Allocate bytes used from shared pool.
         */
        @Override
        protected byte[] newBuffer(int size) {
            assert size == PER_DOC_BLOCK_SIZE;
            return perDocAllocator.getByteBlock();
        }

        /**
         * Recycle the bytes used.
         */
        synchronized void recycle() {
            if (buffers.size() > 0) {
                setLength(0);

                // Recycle the blocks
                perDocAllocator.recycleByteBlocks(buffers);
                buffers.clear();
                sizeInBytes = 0;

                assert numBuffers() == 0;
            }
        }
    }

    /**
     * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
     * which returns the DocConsumer that the DocumentsWriter calls to process the
     * documents. 
     */
    abstract static class IndexingChain {
        abstract DocConsumer getChain(DocumentsWriter documentsWriter);
    }

    static final IndexingChain defaultIndexingChain = new IndexingChain() {

        @Override
        DocConsumer getChain(DocumentsWriter documentsWriter) {
            /*
            This is the current indexing chain:
            
            DocConsumer / DocConsumerPerThread
              --> code: DocFieldProcessor / DocFieldProcessorPerThread
                --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
            --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
              --> code: DocInverter / DocInverterPerThread / DocInverterPerField
                --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
                  --> code: TermsHash / TermsHashPerThread / TermsHashPerField
                    --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
                      --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
                      --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
                --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
                  --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
              --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
            */

            // Build up indexing chain:

            final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter);
            final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();

            final InvertedDocConsumer termsHash = new TermsHash(documentsWriter, true, freqProxWriter, new TermsHash(documentsWriter, false, termVectorsWriter, null));
            final NormsWriter normsWriter = new NormsWriter();
            final DocInverter docInverter = new DocInverter(termsHash, normsWriter);
            return new DocFieldProcessor(documentsWriter, docInverter);
        }
    };

    final DocConsumer consumer;

    // How much RAM we can use before flushing.  This is 0 if
    // we are flushing by doc count instead.

    private final IndexWriterConfig config;

    private boolean closed;
    private final FieldInfos fieldInfos;

    private final BufferedDeletesStream bufferedDeletesStream;
    private final IndexWriter.FlushControl flushControl;

    DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldInfos fieldInfos, BufferedDeletesStream bufferedDeletesStream) throws IOException {
        this.directory = directory;
        this.writer = writer;
        this.similarity = config.getSimilarity();
        this.maxThreadStates = config.getMaxThreadStates();
        this.fieldInfos = fieldInfos;
        this.bufferedDeletesStream = bufferedDeletesStream;
        flushControl = writer.flushControl;

        consumer = config.getIndexingChain().getChain(this);
        this.config = config;
    }

    // Buffer a specific docID for deletion.  Currently only
    // used when we hit a exception when adding a document
    synchronized void deleteDocID(int docIDUpto) {
        pendingDeletes.addDocID(docIDUpto);
        // NOTE: we do not trigger flush here.  This is
        // potentially a RAM leak, if you have an app that tries
        // to add docs but every single doc always hits a
        // non-aborting exception.  Allowing a flush here gets
        // very messy because we are only invoked when handling
        // exceptions so to do this properly, while handling an
        // exception we'd have to go off and flush new deletes
        // which is risky (likely would hit some other
        // confounding exception).
    }

    boolean deleteQueries(Query... queries) {
        final boolean doFlush = flushControl.waitUpdate(0, queries.length);
        synchronized (this) {
            for (Query query : queries) {
                pendingDeletes.addQuery(query, numDocs);
            }
        }
        return doFlush;
    }

    boolean deleteQuery(Query query) {
        final boolean doFlush = flushControl.waitUpdate(0, 1);
        synchronized (this) {
            pendingDeletes.addQuery(query, numDocs);
        }
        return doFlush;
    }

    boolean deleteTerms(Term... terms) {
        final boolean doFlush = flushControl.waitUpdate(0, terms.length);
        synchronized (this) {
            for (Term term : terms) {
                pendingDeletes.addTerm(term, numDocs);
            }
        }
        return doFlush;
    }

    // TODO: we could check w/ FreqProxTermsWriter: if the
    // term doesn't exist, don't bother buffering into the
    // per-DWPT map (but still must go into the global map)
    boolean deleteTerm(Term term, boolean skipWait) {
        final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
        synchronized (this) {
            pendingDeletes.addTerm(term, numDocs);
        }
        return doFlush;
    }

    public FieldInfos getFieldInfos() {
        return fieldInfos;
    }

    /** If non-null, various details of indexing are printed
     *  here. */
    synchronized void setInfoStream(PrintStream infoStream) {
        this.infoStream = infoStream;
        for (int i = 0; i < threadStates.length; i++) {
            threadStates[i].docState.infoStream = infoStream;
        }
    }

    synchronized void setMaxFieldLength(int maxFieldLength) {
        this.maxFieldLength = maxFieldLength;
        for (int i = 0; i < threadStates.length; i++) {
            threadStates[i].docState.maxFieldLength = maxFieldLength;
        }
    }

    synchronized void setSimilarity(Similarity similarity) {
        this.similarity = similarity;
        for (int i = 0; i < threadStates.length; i++) {
            threadStates[i].docState.similarity = similarity;
        }
    }

    /** Get current segment name we are writing. */
    synchronized String getSegment() {
        return segment;
    }

    /** Returns how many docs are currently buffered in RAM. */
    synchronized int getNumDocs() {
        return numDocs;
    }

    void message(String message) {
        if (infoStream != null) {
            writer.message("DW: " + message);
        }
    }

    synchronized void setAborting() {
        if (infoStream != null) {
            message("setAborting");
        }
        aborting = true;
    }

    /** Called if we hit an exception at a bad time (when
     *  updating the index files) and must discard all
     *  currently buffered docs.  This resets our state,
     *  discarding any docs added since last flush. */
    synchronized void abort() throws IOException {
        if (infoStream != null) {
            message("docWriter: abort");
        }

        boolean success = false;

        try {

            // Forcefully remove waiting ThreadStates from line
            try {
                waitQueue.abort();
            } catch (Throwable t) {
            }

            // Wait for all other threads to finish with
            // DocumentsWriter:
            try {
                waitIdle();
            } finally {
                if (infoStream != null) {
                    message("docWriter: abort waitIdle done");
                }

                assert 0 == waitQueue.numWaiting : "waitQueue.numWaiting=" + waitQueue.numWaiting;
                waitQueue.waitingBytes = 0;

                pendingDeletes.clear();

                for (DocumentsWriterThreadState threadState : threadStates) {
                    try {
                        threadState.consumer.abort();
                    } catch (Throwable t) {
                    }
                }

                try {
                    consumer.abort();
                } catch (Throwable t) {
                }

                // Reset all postings data
                doAfterFlush();
            }

            success = true;
        } finally {
            aborting = false;
            notifyAll();
            if (infoStream != null) {
                message("docWriter: done abort; success=" + success);
            }
        }
    }

    /** Reset after a flush */
    private void doAfterFlush() throws IOException {
        // All ThreadStates should be idle when we are called
        assert allThreadsIdle();
        threadBindings.clear();
        waitQueue.reset();
        segment = null;
        numDocs = 0;
        nextDocID = 0;
        bufferIsFull = false;
        for (int i = 0; i < threadStates.length; i++) {
            threadStates[i].doAfterFlush();
        }
    }

    private synchronized boolean allThreadsIdle() {
        for (int i = 0; i < threadStates.length; i++) {
            if (!threadStates[i].isIdle) {
                return false;
            }
        }
        return true;
    }

    synchronized boolean anyChanges() {
        return numDocs != 0 || pendingDeletes.any();
    }

    // for testing
    public BufferedDeletes getPendingDeletes() {
        return pendingDeletes;
    }

    private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
        // Lock order: DW -> BD
        final long delGen = bufferedDeletesStream.getNextGen();
        if (pendingDeletes.any()) {
            if (segmentInfos.size() > 0 || newSegment != null) {
                final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(pendingDeletes, delGen);
                if (infoStream != null) {
                    message("flush: push buffered deletes startSize=" + pendingDeletes.bytesUsed.get() + " frozenSize=" + packet.bytesUsed);
                }
                bufferedDeletesStream.push(packet);
                if (infoStream != null) {
                    message("flush: delGen=" + packet.gen);
                }
                if (newSegment != null) {
                    newSegment.setBufferedDeletesGen(packet.gen);
                }
            } else {
                if (infoStream != null) {
                    message("flush: drop buffered deletes: no segments");
                }
                // We can safely discard these deletes: since
                // there are no segments, the deletions cannot
                // affect anything.
            }
            pendingDeletes.clear();
        } else if (newSegment != null) {
            newSegment.setBufferedDeletesGen(delGen);
        }
    }

    public boolean anyDeletions() {
        return pendingDeletes.any();
    }

    /** Flush all pending docs to a new segment */
    // Lock order: IW -> DW
    synchronized SegmentInfo flush(IndexWriter writer, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {

        final long startTime = System.currentTimeMillis();

        // We change writer's segmentInfos:
        assert Thread.holdsLock(writer);

        waitIdle();

        if (numDocs == 0) {
            // nothing to do!
            if (infoStream != null) {
                message("flush: no docs; skipping");
            }
            // Lock order: IW -> DW -> BD
            pushDeletes(null, segmentInfos);
            return null;
        }

        if (aborting) {
            if (infoStream != null) {
                message("flush: skip because aborting is set");
            }
            return null;
        }

        boolean success = false;

        SegmentInfo newSegment;

        try {
            //System.out.println(Thread.currentThread().getName() + ": nw=" + waitQueue.numWaiting);
            assert nextDocID == numDocs : "nextDocID=" + nextDocID + " numDocs=" + numDocs;
            assert waitQueue.numWaiting == 0 : "numWaiting=" + waitQueue.numWaiting;
            assert waitQueue.waitingBytes == 0;

            if (infoStream != null) {
                message("flush postings as segment " + segment + " numDocs=" + numDocs);
            }

            final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos, numDocs, writer.getConfig().getTermIndexInterval(), pendingDeletes);
            // Apply delete-by-docID now (delete-byDocID only
            // happens when an exception is hit processing that
            // doc, eg if analyzer has some problem w/ the text):
            if (pendingDeletes.docIDs.size() > 0) {
                flushState.deletedDocs = new BitVector(numDocs);
                for (int delDocID : pendingDeletes.docIDs) {
                    flushState.deletedDocs.set(delDocID);
                }
                pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
                pendingDeletes.docIDs.clear();
            }

            newSegment = new SegmentInfo(segment, numDocs, directory, false, true, fieldInfos.hasProx(), false);

            Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
            for (DocumentsWriterThreadState threadState : threadStates) {
                threads.add(threadState.consumer);
            }

            double startMBUsed = bytesUsed() / 1024. / 1024.;

            consumer.flush(threads, flushState);

            newSegment.setHasVectors(flushState.hasVectors);

            if (infoStream != null) {
                message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
                if (flushState.deletedDocs != null) {
                    message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
                }
                message("flushedFiles=" + newSegment.files());
            }

            if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
                final String cfsFileName = IndexFileNames.segmentFileName(segment, IndexFileNames.COMPOUND_FILE_EXTENSION);

                if (infoStream != null) {
                    message("flush: create compound file \"" + cfsFileName + "\"");
                }

                CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, cfsFileName);
                for (String fileName : newSegment.files()) {
                    cfsWriter.addFile(fileName);
                }
                cfsWriter.close();
                deleter.deleteNewFiles(newSegment.files());
                newSegment.setUseCompoundFile(true);
            }

            // Must write deleted docs after the CFS so we don't
            // slurp the del file into CFS:
            if (flushState.deletedDocs != null) {
                final int delCount = flushState.deletedDocs.count();
                assert delCount > 0;
                newSegment.setDelCount(delCount);
                newSegment.advanceDelGen();
                final String delFileName = newSegment.getDelFileName();
                if (infoStream != null) {
                    message("flush: write " + delCount + " deletes to " + delFileName);
                }
                boolean success2 = false;
                try {
                    // TODO: in the NRT case it'd be better to hand
                    // this del vector over to the
                    // shortly-to-be-opened SegmentReader and let it
                    // carry the changes; there's no reason to use
                    // filesystem as intermediary here.
                    flushState.deletedDocs.write(directory, delFileName);
                    success2 = true;
                } finally {
                    if (!success2) {
                        try {
                            directory.deleteFile(delFileName);
                        } catch (Throwable t) {
                            // suppress this so we keep throwing the
                            // original exception
                        }
                    }
                }
            }

            if (infoStream != null) {
                message("flush: segment=" + newSegment);
                final double newSegmentSizeNoStore = newSegment.sizeInBytes(false) / 1024. / 1024.;
                final double newSegmentSize = newSegment.sizeInBytes(true) / 1024. / 1024.;
                message("  ramUsed=" + nf.format(startMBUsed) + " MB" + " newFlushedSize=" + nf.format(newSegmentSize) + " MB" + " (" + nf.format(newSegmentSizeNoStore) + " MB w/o doc stores)" + " docs/MB=" + nf.format(numDocs / newSegmentSize) + " new/old=" + nf.format(100.0 * newSegmentSizeNoStore / startMBUsed) + "%");
            }

            success = true;
        } finally {
            notifyAll();
            if (!success) {
                if (segment != null) {
                    deleter.refresh(segment);
                }
                abort();
            }
        }

        doAfterFlush();

        // Lock order: IW -> DW -> BD
        pushDeletes(newSegment, segmentInfos);
        if (infoStream != null) {
            message("flush time " + (System.currentTimeMillis() - startTime) + " msec");
        }

        return newSegment;
    }

    synchronized void close() {
        closed = true;
        notifyAll();
    }

    /** Returns a free (idle) ThreadState that may be used for
     * indexing this one document.  This call also pauses if a
     * flush is pending.  If delTerm is non-null then we
     * buffer this deleted term after the thread state has
     * been acquired. */
    synchronized DocumentsWriterThreadState getThreadState(Term delTerm, int docCount) throws IOException {

        final Thread currentThread = Thread.currentThread();
        assert !Thread.holdsLock(writer);

        // First, find a thread state.  If this thread already
        // has affinity to a specific ThreadState, use that one
        // again.
        DocumentsWriterThreadState state = threadBindings.get(currentThread);
        if (state == null) {

            // First time this thread has called us since last
            // flush.  Find the least loaded thread state:
            DocumentsWriterThreadState minThreadState = null;
            for (int i = 0; i < threadStates.length; i++) {
                DocumentsWriterThreadState ts = threadStates[i];
                if (minThreadState == null || ts.numThreads < minThreadState.numThreads) {
                    minThreadState = ts;
                }
            }
            if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= maxThreadStates)) {
                state = minThreadState;
                state.numThreads++;
            } else {
                // Just create a new "private" thread state
                DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1 + threadStates.length];
                if (threadStates.length > 0) {
                    System.arraycopy(threadStates, 0, newArray, 0, threadStates.length);
                }
                state = newArray[threadStates.length] = new DocumentsWriterThreadState(this);
                threadStates = newArray;
            }
            threadBindings.put(currentThread, state);
        }

        // Next, wait until my thread state is idle (in case
        // it's shared with other threads), and no flush/abort
        // pending 
        waitReady(state);

        // Allocate segment name if this is the first doc since
        // last flush:
        if (segment == null) {
            segment = writer.newSegmentName();
            assert numDocs == 0;
        }

        state.docState.docID = nextDocID;
        nextDocID += docCount;

        if (delTerm != null) {
            pendingDeletes.addTerm(delTerm, state.docState.docID);
        }

        numDocs += docCount;
        state.isIdle = false;
        return state;
    }

    boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
        return updateDocument(doc, analyzer, null);
    }

    boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm) throws CorruptIndexException, IOException {

        // Possibly trigger a flush, or wait until any running flush completes:
        boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);

        // This call is synchronized but fast
        final DocumentsWriterThreadState state = getThreadState(delTerm, 1);

        final DocState docState = state.docState;
        docState.doc = doc;
        docState.analyzer = analyzer;

        boolean success = false;
        try {
            // This call is not synchronized and does all the
            // work
            final DocWriter perDoc;
            try {
                perDoc = state.consumer.processDocument();
            } finally {
                docState.clear();
            }

            // This call is synchronized but fast
            finishDocument(state, perDoc);

            success = true;
        } finally {
            if (!success) {

                // If this thread state had decided to flush, we
                // must clear it so another thread can flush
                if (doFlush) {
                    flushControl.clearFlushPending();
                }

                if (infoStream != null) {
                    message("exception in updateDocument aborting=" + aborting);
                }

                synchronized (this) {

                    state.isIdle = true;
                    notifyAll();

                    if (aborting) {
                        abort();
                    } else {
                        skipDocWriter.docID = docState.docID;
                        boolean success2 = false;
                        try {
                            waitQueue.add(skipDocWriter);
                            success2 = true;
                        } finally {
                            if (!success2) {
                                abort();
                                return false;
                            }
                        }

                        // Immediately mark this document as deleted
                        // since likely it was partially added.  This
                        // keeps indexing as "all or none" (atomic) when
                        // adding a document:
                        deleteDocID(state.docState.docID);
                    }
                }
            }
        }

        doFlush |= flushControl.flushByRAMUsage("new document");

        return doFlush;
    }

    boolean updateDocuments(Collection<Document> docs, Analyzer analyzer, Term delTerm) throws CorruptIndexException, IOException {

        // Possibly trigger a flush, or wait until any running flush completes:
        boolean doFlush = flushControl.waitUpdate(docs.size(), delTerm != null ? 1 : 0);

        final int docCount = docs.size();

        // This call is synchronized but fast -- we allocate the
        // N docIDs up front:
        final DocumentsWriterThreadState state = getThreadState(null, docCount);
        final DocState docState = state.docState;

        final int startDocID = docState.docID;
        int docID = startDocID;

        //System.out.println(Thread.currentThread().getName() + ": A " + docCount);
        for (Document doc : docs) {
            docState.doc = doc;
            docState.analyzer = analyzer;
            // Assign next docID from our block:
            docState.docID = docID++;

            boolean success = false;
            try {
                // This call is not synchronized and does all the
                // work
                final DocWriter perDoc;
                try {
                    perDoc = state.consumer.processDocument();
                } finally {
                    docState.clear();
                }

                // Must call this w/o holding synchronized(this) else
                // we'll hit deadlock:
                balanceRAM();

                // Synchronized but fast
                synchronized (this) {
                    if (aborting) {
                        break;
                    }
                    assert perDoc == null || perDoc.docID == docState.docID;
                    final boolean doPause;
                    if (perDoc != null) {
                        waitQueue.add(perDoc);
                    } else {
                        skipDocWriter.docID = docState.docID;
                        waitQueue.add(skipDocWriter);
                    }
                }

                success = true;
            } finally {
                if (!success) {
                    //System.out.println(Thread.currentThread().getName() + ": E");

                    // If this thread state had decided to flush, we
                    // must clear it so another thread can flush
                    if (doFlush) {
                        message("clearFlushPending!");
                        flushControl.clearFlushPending();
                    }

                    if (infoStream != null) {
                        message("exception in updateDocuments aborting=" + aborting);
                    }

                    synchronized (this) {

                        state.isIdle = true;
                        notifyAll();

                        if (aborting) {
                            abort();
                        } else {

                            // Fill hole in the doc stores for all
                            // docIDs we pre-allocated
                            //System.out.println(Thread.currentThread().getName() + ": F " + docCount);
                            final int endDocID = startDocID + docCount;
                            docID = docState.docID;
                            while (docID < endDocID) {
                                skipDocWriter.docID = docID++;
                                boolean success2 = false;
                                try {
                                    waitQueue.add(skipDocWriter);
                                    success2 = true;
                                } finally {
                                    if (!success2) {
                                        abort();
                                        return false;
                                    }
                                }
                            }
                            //System.out.println(Thread.currentThread().getName() + ":   F " + docCount + " done");

                            // Mark all pre-allocated docIDs as deleted:
                            docID = startDocID;
                            while (docID < startDocID + docs.size()) {
                                deleteDocID(docID++);
                            }
                        }
                    }
                }
            }
        }

        synchronized (this) {
            // We must delay pausing until the full doc block is
            // added, else we can hit deadlock if more than one
            // thread is adding a block and we need to pause when
            // both are only part way done:
            if (waitQueue.doPause()) {
                waitForWaitQueue();
            }

            //System.out.println(Thread.currentThread().getName() + ":   A " + docCount);

            if (aborting) {

                // We are currently aborting, and another thread is
                // waiting for me to become idle.  We just forcefully
                // idle this threadState; it will be fully reset by
                // abort()
                state.isIdle = true;

                // wakes up any threads waiting on the wait queue
                notifyAll();

                abort();

                if (doFlush) {
                    message("clearFlushPending!");
                    flushControl.clearFlushPending();
                }

                return false;
            }

            // Apply delTerm only after all indexing has
            // succeeded, but apply it only to docs prior to when
            // this batch started:
            if (delTerm != null) {
                pendingDeletes.addTerm(delTerm, startDocID);
            }

            state.isIdle = true;

            // wakes up any threads waiting on the wait queue
            notifyAll();
        }

        doFlush |= flushControl.flushByRAMUsage("new document");

        //System.out.println(Thread.currentThread().getName() + ":   B " + docCount);
        return doFlush;
    }

    public synchronized void waitIdle() {
        while (!allThreadsIdle()) {
            try {
                wait();
            } catch (InterruptedException ie) {
                throw new ThreadInterruptedException(ie);
            }
        }
    }

    synchronized void waitReady(DocumentsWriterThreadState state) {
        while (!closed && (!state.isIdle || aborting)) {
            try {
                wait();
            } catch (InterruptedException ie) {
                throw new ThreadInterruptedException(ie);
            }
        }

        if (closed) {
            throw new AlreadyClosedException("this IndexWriter is closed");
        }
    }

    /** Does the synchronized work to finish/flush the
     *  inverted document. */
    private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {

        // Must call this w/o holding synchronized(this) else
        // we'll hit deadlock:
        balanceRAM();

        synchronized (this) {

            assert docWriter == null || docWriter.docID == perThread.docState.docID;

            if (aborting) {

                // We are currently aborting, and another thread is
                // waiting for me to become idle.  We just forcefully
                // idle this threadState; it will be fully reset by
                // abort()
                if (docWriter != null) {
                    try {
                        docWriter.abort();
                    } catch (Throwable t) {
                    }
                }

                perThread.isIdle = true;

                // wakes up any threads waiting on the wait queue
                notifyAll();

                return;
            }

            final boolean doPause;

            if (docWriter != null) {
                doPause = waitQueue.add(docWriter);
            } else {
                skipDocWriter.docID = perThread.docState.docID;
                doPause = waitQueue.add(skipDocWriter);
            }

            if (doPause) {
                waitForWaitQueue();
            }

            perThread.isIdle = true;

            // wakes up any threads waiting on the wait queue
            notifyAll();
        }
    }

    synchronized void waitForWaitQueue() {
        do {
            try {
                wait();
            } catch (InterruptedException ie) {
                throw new ThreadInterruptedException(ie);
            }
        } while (!waitQueue.doResume());
    }

    private static class SkipDocWriter extends DocWriter {
        @Override
        void finish() {
        }

        @Override
        void abort() {
        }

        @Override
        long sizeInBytes() {
            return 0;
        }
    }

    final SkipDocWriter skipDocWriter = new SkipDocWriter();

    NumberFormat nf = NumberFormat.getInstance();

    /* Initial chunks size of the shared byte[] blocks used to
     store postings data */
    final static int BYTE_BLOCK_SHIFT = 15;
    final static int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;
    final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
    final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;

    private class ByteBlockAllocator extends ByteBlockPool.Allocator {
        final int blockSize;

        ByteBlockAllocator(int blockSize) {
            this.blockSize = blockSize;
        }

        ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();

        /* Allocate another byte[] from the shared pool */
        @Override
        byte[] getByteBlock() {
            synchronized (DocumentsWriter.this) {
                final int size = freeByteBlocks.size();
                final byte[] b;
                if (0 == size) {
                    b = new byte[blockSize];
                    bytesUsed.addAndGet(blockSize);
                } else
                    b = freeByteBlocks.remove(size - 1);
                return b;
            }
        }

        /* Return byte[]'s to the pool */

        @Override
        void recycleByteBlocks(byte[][] blocks, int start, int end) {
            synchronized (DocumentsWriter.this) {
                for (int i = start; i < end; i++) {
                    freeByteBlocks.add(blocks[i]);
                    blocks[i] = null;
                }
            }
        }

        @Override
        void recycleByteBlocks(List<byte[]> blocks) {
            synchronized (DocumentsWriter.this) {
                final int size = blocks.size();
                for (int i = 0; i < size; i++) {
                    freeByteBlocks.add(blocks.get(i));
                    blocks.set(i, null);
                }
            }
        }
    }

    /* Initial chunks size of the shared int[] blocks used to
     store postings data */
    final static int INT_BLOCK_SHIFT = 13;
    final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
    final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;

    private List<int[]> freeIntBlocks = new ArrayList<int[]>();

    /* Allocate another int[] from the shared pool */
    synchronized int[] getIntBlock() {
        final int size = freeIntBlocks.size();
        final int[] b;
        if (0 == size) {
            b = new int[INT_BLOCK_SIZE];
            bytesUsed.addAndGet(INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
        } else {
            b = freeIntBlocks.remove(size - 1);
        }
        return b;
    }

    synchronized void bytesUsed(long numBytes) {
        bytesUsed.addAndGet(numBytes);
    }

    long bytesUsed() {
        return bytesUsed.get() + pendingDeletes.bytesUsed.get();
    }

    /* Return int[]s to the pool */
    synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {
        for (int i = start; i < end; i++) {
            freeIntBlocks.add(blocks[i]);
            blocks[i] = null;
        }
    }

    ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);

    final static int PER_DOC_BLOCK_SIZE = 1024;

    final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);

    /* Initial chunk size of the shared char[] blocks used to
     store term text */
    final static int CHAR_BLOCK_SHIFT = 14;
    final static int CHAR_BLOCK_SIZE = 1 << CHAR_BLOCK_SHIFT;
    final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;

    final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE - 1;

    private ArrayList<char[]> freeCharBlocks = new ArrayList<char[]>();

    /* Allocate another char[] from the shared pool */
    synchronized char[] getCharBlock() {
        final int size = freeCharBlocks.size();
        final char[] c;
        if (0 == size) {
            bytesUsed.addAndGet(CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
            c = new char[CHAR_BLOCK_SIZE];
        } else
            c = freeCharBlocks.remove(size - 1);
        // We always track allocations of char blocks, for now,
        // because nothing that skips allocation tracking
        // (currently only term vectors) uses its own char
        // blocks.
        return c;
    }

    /* Return char[]s to the pool */
    synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {
        for (int i = 0; i < numBlocks; i++) {
            freeCharBlocks.add(blocks[i]);
            blocks[i] = null;
        }
    }

    String toMB(long v) {
        return nf.format(v / 1024. / 1024.);
    }

    /* We have four pools of RAM: Postings, byte blocks
     * (holds freq/prox posting data), char blocks (holds
     * characters in the term) and per-doc buffers (stored fields/term vectors).  
     * Different docs require varying amount of storage from 
     * these four classes.
     * 
     * For example, docs with many unique single-occurrence
     * short terms will use up the Postings RAM and hardly any
     * of the other two.  Whereas docs with very large terms
     * will use alot of char blocks RAM and relatively less of
     * the other two.  This method just frees allocations from
     * the pools once we are over-budget, which balances the
     * pools to match the current docs. */
    void balanceRAM() {

        final boolean doBalance;
        final long deletesRAMUsed;

        deletesRAMUsed = bufferedDeletesStream.bytesUsed();

        final long ramBufferSize;
        final double mb = config.getRAMBufferSizeMB();
        if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
            ramBufferSize = IndexWriterConfig.DISABLE_AUTO_FLUSH;
        } else {
            ramBufferSize = (long) (mb * 1024 * 1024);
        }

        synchronized (this) {
            if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
                return;
            }

            doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
        }

        if (doBalance) {

            if (infoStream != null) {
                message("  RAM: balance allocations: usedMB=" + toMB(bytesUsed()) + " vs trigger=" + toMB(ramBufferSize) + " deletesMB=" + toMB(deletesRAMUsed) + " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size() * BYTE_BLOCK_SIZE) + " perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size() * PER_DOC_BLOCK_SIZE) + " charBlockFree=" + toMB(freeCharBlocks.size() * CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR));
            }

            final long startBytesUsed = bytesUsed() + deletesRAMUsed;

            int iter = 0;

            // We free equally from each pool in 32 KB
            // chunks until we are below our threshold
            // (freeLevel)

            boolean any = true;

            final long freeLevel = (long) (0.95 * ramBufferSize);

            while (bytesUsed() + deletesRAMUsed > freeLevel) {

                synchronized (this) {
                    if (0 == perDocAllocator.freeByteBlocks.size() && 0 == byteBlockAllocator.freeByteBlocks.size() && 0 == freeCharBlocks.size() && 0 == freeIntBlocks.size() && !any) {
                        // Nothing else to free -- must flush now.
                        bufferIsFull = bytesUsed() + deletesRAMUsed > ramBufferSize;
                        if (infoStream != null) {
                            if (bytesUsed() + deletesRAMUsed > ramBufferSize) {
                                message("    nothing to free; set bufferIsFull");
                            } else {
                                message("    nothing to free");
                            }
                        }
                        break;
                    }

                    if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.size() > 0) {
                        byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size() - 1);
                        bytesUsed.addAndGet(-BYTE_BLOCK_SIZE);
                    }

                    if ((1 == iter % 5) && freeCharBlocks.size() > 0) {
                        freeCharBlocks.remove(freeCharBlocks.size() - 1);
                        bytesUsed.addAndGet(-CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
                    }

                    if ((2 == iter % 5) && freeIntBlocks.size() > 0) {
                        freeIntBlocks.remove(freeIntBlocks.size() - 1);
                        bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
                    }

                    if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.size() > 0) {
                        // Remove upwards of 32 blocks (each block is 1K)
                        for (int i = 0; i < 32; ++i) {
                            perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size() - 1);
                            bytesUsed.addAndGet(-PER_DOC_BLOCK_SIZE);
                            if (perDocAllocator.freeByteBlocks.size() == 0) {
                                break;
                            }
                        }
                    }
                }

                if ((4 == iter % 5) && any) {
                    // Ask consumer to free any recycled state
                    any = consumer.freeRAM();
                }

                iter++;
            }

            if (infoStream != null) {
                message("    after free: freedMB=" + nf.format((startBytesUsed - bytesUsed() - deletesRAMUsed) / 1024. / 1024.) + " usedMB=" + nf.format((bytesUsed() + deletesRAMUsed) / 1024. / 1024.));
            }
        }
    }

    final WaitQueue waitQueue = new WaitQueue();

    private class WaitQueue {
        DocWriter[] waiting;
        int nextWriteDocID;
        int nextWriteLoc;
        int numWaiting;
        long waitingBytes;

        public WaitQueue() {
            waiting = new DocWriter[10];
        }

        synchronized void reset() {
            // NOTE: nextWriteLoc doesn't need to be reset
            assert numWaiting == 0;
            assert waitingBytes == 0;
            nextWriteDocID = 0;
        }

        synchronized boolean doResume() {
            final double mb = config.getRAMBufferSizeMB();
            final long waitQueueResumeBytes;
            if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
                waitQueueResumeBytes = 2 * 1024 * 1024;
            } else {
                waitQueueResumeBytes = (long) (mb * 1024 * 1024 * 0.05);
            }
            return waitingBytes <= waitQueueResumeBytes;
        }

        synchronized boolean doPause() {
            final double mb = config.getRAMBufferSizeMB();
            final long waitQueuePauseBytes;
            if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
                waitQueuePauseBytes = 4 * 1024 * 1024;
            } else {
                waitQueuePauseBytes = (long) (mb * 1024 * 1024 * 0.1);
            }
            return waitingBytes > waitQueuePauseBytes;
        }

        synchronized void abort() {
            int count = 0;
            for (int i = 0; i < waiting.length; i++) {
                final DocWriter doc = waiting[i];
                if (doc != null) {
                    doc.abort();
                    waiting[i] = null;
                    count++;
                }
            }
            waitingBytes = 0;
            assert count == numWaiting;
            numWaiting = 0;
        }

        private void writeDocument(DocWriter doc) throws IOException {
            assert doc == skipDocWriter || nextWriteDocID == doc.docID;
            boolean success = false;
            try {
                doc.finish();
                nextWriteDocID++;
                nextWriteLoc++;
                assert nextWriteLoc <= waiting.length;
                if (nextWriteLoc == waiting.length) {
                    nextWriteLoc = 0;
                }
                success = true;
            } finally {
                if (!success) {
                    setAborting();
                }
            }
        }

        synchronized public boolean add(DocWriter doc) throws IOException {

            assert doc.docID >= nextWriteDocID;

            if (doc.docID == nextWriteDocID) {
                writeDocument(doc);
                while (true) {
                    doc = waiting[nextWriteLoc];
                    if (doc != null) {
                        numWaiting--;
                        waiting[nextWriteLoc] = null;
                        waitingBytes -= doc.sizeInBytes();
                        writeDocument(doc);
                    } else {
                        break;
                    }
                }
            } else {

                // I finished before documents that were added
                // before me.  This can easily happen when I am a
                // small doc and the docs before me were large, or,
                // just due to luck in the thread scheduling.  Just
                // add myself to the queue and when that large doc
                // finishes, it will flush me:
                int gap = doc.docID - nextWriteDocID;
                if (gap >= waiting.length) {
                    // Grow queue
                    DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
                    assert nextWriteLoc >= 0;
                    System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length - nextWriteLoc);
                    System.arraycopy(waiting, 0, newArray, waiting.length - nextWriteLoc, nextWriteLoc);
                    nextWriteLoc = 0;
                    waiting = newArray;
                    gap = doc.docID - nextWriteDocID;
                }

                int loc = nextWriteLoc + gap;
                if (loc >= waiting.length) {
                    loc -= waiting.length;
                }

                // We should only wrap one time
                assert loc < waiting.length;

                // Nobody should be in my spot!
                assert waiting[loc] == null;
                waiting[loc] = doc;
                numWaiting++;
                waitingBytes += doc.sizeInBytes();
            }

            return doPause();
        }
    }
}
